package com.avris.tool.util;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @author Jast
 * @description
 * @date 2022-09-13 15:29
 */
public class ConsumerUtil {
    /**
     * Kafka消费者配置
     * @param brokerList
     * @param groupID
     * @return
     */
    private static Properties getProperties(String brokerList, String groupID) {
        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);//控制每次poll的数量
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        return consumerProperties;
    }

    public static void getOffset(String brokerList , String topic,String groupID){
        Properties consumerProperties = getProperties(brokerList, groupID);

        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {



            consumer.subscribe(Collections.singleton(topic));
            print(topic, consumer, "当前消费信息，topic:");

            Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(consumer.partitionsFor(topic).stream().map(partitionInfo ->
                    new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList()));
            topicPartitionLongMap.forEach((x,y)-> System.out.println("最新 Partition:"+x.partition()+",offset:"+y));


            Map<TopicPartition, Long> topicPartitionLongBeginMap = consumer.beginningOffsets(consumer.partitionsFor(topic).stream().map(partitionInfo ->
                    new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList()));
            topicPartitionLongBeginMap.forEach((x,y)-> System.out.println("最早 Partition:"+x.partition()+",offset:"+y));

        }
    }

    public static void setOffsetTime(String brokerList,String groupID,String topic,Long ts){
        Properties consumerProperties = getProperties(brokerList, groupID);

        //将offset 设置为 重设位移到 2020 年 4 月 27 日 14 点 14分 0秒 的数据
        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            consumer.poll(0);
            Map<TopicPartition, Long> timeToSearch =
                    consumer.partitionsFor(topic)
                            .stream()
                            .map(info ->
                                    new TopicPartition(topic, info.partition()))
                            .collect(Collectors.toMap(Function.identity(), tp -> ts));
            System.out.println("timeToSearch:"+timeToSearch);
            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
                    consumer.offsetsForTimes(timeToSearch).entrySet()) {
                System.out.println(entry.getKey()+" "+ entry.getValue().offset());
                consumer.seek(entry.getKey(), entry.getValue().offset());
                consumer.commitSync();
            }
        }
    }

    private static void print(String topic, KafkaConsumer<String, String> consumer, String s) {
        try {
            consumer.partitionsFor(topic).stream().forEach(partitionInfo -> {
                System.out.println(s + partitionInfo.partition() + "\toffset:" + consumer.committed(new TopicPartition(topic, partitionInfo.partition())).offset());
            });
        }catch (Exception e ){
            System.out.println("当前消费者组未消费过数据");
        }
    }
}
